{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Python 3.7.0 (default, Sep 13 2018, 20:15:46) \n",
      "[GCC 7.3.0]\n"
     ]
    }
   ],
   "source": [
    "import sys\n",
    "print('Python {}'.format(sys.version))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<cfxdb.schema.Schema object at 0x7f1438491eb8>\n"
     ]
    }
   ],
   "source": [
    "# import crossbar-zlmdb and open event history example database\n",
    "import zlmdb\n",
    "from cfxdb import Schema\n",
    "\n",
    "# when running notebook inside docker\n",
    "DBFILE = '../testdb'\n",
    "\n",
    "# when running notebook directly on host\n",
    "DBFILE = '../../crossbar/.testdb'\n",
    "\n",
    "db = zlmdb.Database(DBFILE, maxsize=2**30, readonly=True)\n",
    "\n",
    "schema = Schema.attach(db)\n",
    "\n",
    "print(schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "66339941290\n",
      "com.example.geoservice.alert.5.24\n",
      "2475181101487056\n",
      "[{'category': 'alert', 'x': 5, 'y': 24, 'value1': b'\\x15H\\xb6&3a\\x80\\xe6m!n\\x85@c\\x80\\x01', 'value2': 0.6736680899470212, 'value3': 'SJVW-YP7W-3K6A', 'i': 29952, 'j': 2}, 29952]\n",
      "{'value3': 'SJVW-YP7W-3K6A'}\n",
      "None\n",
      "7M7C-JSKK-9WRN-99HN-W6MF-YVCH\n",
      "1540491042079502723\n",
      "0\n"
     ]
    }
   ],
   "source": [
    "# get a single event from event history by publication ID\n",
    "\n",
    "with db.begin() as txn:\n",
    "    # get the first event ID\n",
    "    for pub in schema.publications.select(txn, limit=1, return_keys=False):\n",
    "        print(pub.publication)\n",
    "        print(pub.topic)\n",
    "        print(pub.publisher)\n",
    "        print(pub.args)\n",
    "        print(pub.kwargs)\n",
    "        print(pub.payload)\n",
    "        #print(pub.marshal())\n",
    "        publisher = schema.sessions[txn, pub.publisher]\n",
    "        print(publisher.authid)\n",
    "        print(publisher.joined_at)\n",
    "        print(publisher.left_at)\n",
    "        "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "sessions: 40\n",
      "publications: 87140\n",
      "events: 427547\n"
     ]
    }
   ],
   "source": [
    "# count all events in event history\n",
    "\n",
    "with db.begin() as txn:\n",
    "    cnt = schema.sessions.count(txn)\n",
    "    print('sessions: {}'.format(cnt))\n",
    "\n",
    "    cnt = schema.publications.count(txn)\n",
    "    print('publications: {}'.format(cnt))\n",
    "\n",
    "    cnt = schema.events.count(txn)\n",
    "    print('events: {}'.format(cnt))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'3QXK-QNUP-PMC4-M5YA-TR5G-CG5E': 309,\n",
      " '4HFC-L4HH-HQMR-6PJY-VPHL-YL5M': 168,\n",
      " '6FFF-9F9S-W673-GMQ5-3C99-44A6': 200,\n",
      " '7M7C-JSKK-9WRN-99HN-W6MF-YVCH': 1494,\n",
      " '7MEX-KJ5X-KKAK-Q5KU-9QLU-GYG9': 17,\n",
      " '7YSE-VEN7-7X3R-CFHM-3T55-993P': 290,\n",
      " 'A3TR-MT64-4NT6-EJJ6-XTRW-J7AT': 168,\n",
      " 'ECFF-T7NL-5NS9-ERF6-7TMC-CF4E': 328,\n",
      " 'ECYQ-J5F3-XJHW-4T3H-QHX6-U64N': 1511,\n",
      " 'FA7X-975X-TTGN-CAKT-APE9-F6FA': 200,\n",
      " 'HUNU-7FEC-M7WS-4XM9-UNTA-7QWV': 1512,\n",
      " 'L6WQ-53UX-CEKX-WXU4-HFJU-5HRM': 325,\n",
      " 'QCKR-LQY6-9QG7-RECU-SCVG-YLY7': 1494,\n",
      " 'TKYG-H6FP-SWUT-JVLQ-W4WN-7M3G': 200,\n",
      " 'U4XJ-7VYK-4NMM-QS4M-7PTE-THS3': 290,\n",
      " 'WMUT-W7YR-H476-649Q-54VE-KAXT': 1494}\n"
     ]
    }
   ],
   "source": [
    "# count all events, grouped by authid\n",
    "\n",
    "from pprint import pprint\n",
    "authids = {}\n",
    "\n",
    "with db.begin() as txn:\n",
    "    for evt in schema.events.select(txn, limit=10000, return_keys=False):\n",
    "        rec = schema.sessions[txn, evt.receiver]\n",
    "        if rec.authid not in authids:\n",
    "            authids[rec.authid] = 0\n",
    "        authids[rec.authid] += 1\n",
    "\n",
    "pprint(authids)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "54329 records in 0.38 secs (143500 records per sec)\n"
     ]
    }
   ],
   "source": [
    "import time\n",
    "\n",
    "def doit(verbose=False):\n",
    "    res = {}\n",
    "    total = 0\n",
    "    sum = 0\n",
    "\n",
    "    with db.begin() as txn:\n",
    "        i = 0\n",
    "        for evt in schema.publications.select(txn, limit=1000000-1, return_keys=False):\n",
    "            if evt.topic.startswith('com.example.geoservice.'):\n",
    "                e = evt.args[0]\n",
    "                x, y, category = e['x'], e['y'], e['category']\n",
    "                value1, value2, value3 = e['value1'], e['value2'], e['value3']\n",
    "                d = (x, y)\n",
    "                if d not in res:\n",
    "                    res[d] = (0, 0)\n",
    "                res[d] = (res[d][0] + 1, res[d][1] + value2)\n",
    "                total += 1\n",
    "                \n",
    "                if False:\n",
    "                    if i < 10 or res[d][0] > 14:\n",
    "                        print('({}, {}): {}. category=\"{}\"  topic=\"{}\"'.format(x, y, res[d], category, evt.topic))\n",
    "                    elif i == 10:\n",
    "                        print('...')\n",
    "                i += 1\n",
    "    if verbose:\n",
    "        pprint(res)\n",
    "    return total\n",
    "\n",
    "started = time.perf_counter()\n",
    "total = doit()\n",
    "ended = time.perf_counter()\n",
    "recs_per_sec = int(float(total) / (ended - started))\n",
    "print('{} records in {:.2} secs ({} records per sec)'.format(total, ended - started, recs_per_sec))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "publishers:\n",
      "{2361200170397101: 4993, 2475181101487056: 41567, 8760936425110809: 7769}\n",
      "receivers:\n",
      "{825130965066635: 4982,\n",
      " 840686762535787: 7252,\n",
      " 1099415647834027: 4923,\n",
      " 1353882293476443: 470,\n",
      " 2277162720874418: 7678,\n",
      " 2475181101487056: 41027,\n",
      " 2608742549618717: 41026,\n",
      " 4580631653116131: 4216,\n",
      " 4880811124571864: 4216,\n",
      " 5932342410277093: 41026,\n",
      " 6192714356302635: 6939,\n",
      " 7488114861245392: 41497,\n",
      " 7646455278784097: 41566,\n",
      " 7983575357203379: 6939,\n",
      " 8760936425110809: 7759,\n",
      " 8942188816060976: 4913}\n",
      "54329 records in 0.4 secs (137330 records per sec)\n"
     ]
    }
   ],
   "source": [
    "from pprint import pprint\n",
    "\n",
    "def doit(publishers, receivers):\n",
    "    with db.begin() as txn:\n",
    "        for pub in schema.publications.select(txn, limit=1000000-1, return_keys=False):\n",
    "            if pub.publisher not in publishers:\n",
    "                publishers[pub.publisher] = 0\n",
    "            publishers[pub.publisher] += 1\n",
    "            \n",
    "            from_key = (pub.publication, 0, 0)\n",
    "            to_key = (pub.publication + 1, 0, 0)\n",
    "            for evt in schema.events.select(txn, from_key=from_key, to_key=to_key, return_keys=False):\n",
    "                if evt.receiver not in receivers:\n",
    "                    receivers[evt.receiver] = 0\n",
    "                receivers[evt.receiver] += 1\n",
    "            \n",
    "    total = 0\n",
    "    for x in publishers.values():\n",
    "        total += x\n",
    "    return total\n",
    "\n",
    "publishers = {}\n",
    "receivers = {}\n",
    "\n",
    "started = time.perf_counter()\n",
    "total = doit(publishers, receivers)\n",
    "ended = time.perf_counter()\n",
    "recs_per_sec = int(float(total) / (ended - started))\n",
    "\n",
    "print('publishers:')\n",
    "pprint(publishers)\n",
    "\n",
    "print('receivers:')\n",
    "pprint(receivers)\n",
    "\n",
    "print('{} records in {:.2} secs ({} records per sec)'.format(total, ended - started, recs_per_sec))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "54329\n"
     ]
    }
   ],
   "source": [
    "total = 0\n",
    "\n",
    "with db.begin() as txn:\n",
    "    for evt in schema.publications.select(txn, limit=100000, return_keys=False):\n",
    "        if evt.topic.startswith('com.example.geoservice.'):\n",
    "            total += 1\n",
    "\n",
    "print(total)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{(0, 0): 3237,\n",
      " (0, 1): 3338,\n",
      " (0, 2): 3384,\n",
      " (0, 3): 3312,\n",
      " (0, 4): 129,\n",
      " (1, 0): 3305,\n",
      " (1, 1): 3415,\n",
      " (1, 2): 3342,\n",
      " (1, 3): 3346,\n",
      " (1, 4): 140,\n",
      " (2, 0): 3281,\n",
      " (2, 1): 3394,\n",
      " (2, 2): 3225,\n",
      " (2, 3): 3261,\n",
      " (2, 4): 141,\n",
      " (3, 0): 3422,\n",
      " (3, 1): 3345,\n",
      " (3, 2): 3301,\n",
      " (3, 3): 3318,\n",
      " (3, 4): 136,\n",
      " (4, 0): 161,\n",
      " (4, 1): 123,\n",
      " (4, 2): 129,\n",
      " (4, 3): 140,\n",
      " (4, 4): 4}\n",
      "{'ad': 10897, 'alert': 10900, 'info': 10856, 'other': 10948, 'warning': 10728}\n",
      "54329 records in 0.09 secs (605889 records per sec)\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "605889"
      ]
     },
     "execution_count": 27,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import re\n",
    "import timeit\n",
    "\n",
    "def test2(verbose=False):\n",
    "    pat = re.compile(r'^com\\.example\\.geoservice\\.([a-z]+)\\.([0-9]+).([0-9]+)$')\n",
    "\n",
    "    def trunc(val):\n",
    "        return int(val / 25)\n",
    "\n",
    "    started = time.perf_counter()\n",
    "    total = 0\n",
    "\n",
    "    res1 = {}\n",
    "    res2 = {}\n",
    "    with db.begin() as txn:\n",
    "        for pub in schema.publications.select(txn, limit=1000000-1, return_keys=False):\n",
    "            total += 1\n",
    "            m = pat.match(pub.topic)\n",
    "            if m:\n",
    "                category, x, y = m.groups()\n",
    "\n",
    "                key = (trunc(int(x)), trunc(int(y)))\n",
    "                if key not in res1:\n",
    "                    res1[key] = 0\n",
    "                res1[key] += 1\n",
    "\n",
    "                if category not in res2:\n",
    "                    res2[category] = 0\n",
    "                res2[category] += 1\n",
    "\n",
    "    ended = time.perf_counter()\n",
    "\n",
    "    if verbose:\n",
    "        pprint(res1)\n",
    "        pprint(res2)\n",
    "\n",
    "    recs_per_sec = int(float(total) / (ended - started))\n",
    "    print('{} records in {:.2} secs ({} records per sec)'.format(total, ended - started, recs_per_sec))\n",
    "\n",
    "    return recs_per_sec\n",
    "\n",
    "test2(verbose=True)\n",
    "#timeit.timeit(test2, number=10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'3QXK-QNUP-PMC4-M5YA-TR5G-CG5E': 7252,\n",
      " '4HFC-L4HH-HQMR-6PJY-VPHL-YL5M': 4216,\n",
      " '6FFF-9F9S-W673-GMQ5-3C99-44A6': 4982,\n",
      " '7M7C-JSKK-9WRN-99HN-W6MF-YVCH': 41027,\n",
      " '7MEX-KJ5X-KKAK-Q5KU-9QLU-GYG9': 470,\n",
      " '7YSE-VEN7-7X3R-CFHM-3T55-993P': 6939,\n",
      " 'A3TR-MT64-4NT6-EJJ6-XTRW-J7AT': 4216,\n",
      " 'ECFF-T7NL-5NS9-ERF6-7TMC-CF4E': 7759,\n",
      " 'ECYQ-J5F3-XJHW-4T3H-QHX6-U64N': 41497,\n",
      " 'FA7X-975X-TTGN-CAKT-APE9-F6FA': 4913,\n",
      " 'HUNU-7FEC-M7WS-4XM9-UNTA-7QWV': 41566,\n",
      " 'L6WQ-53UX-CEKX-WXU4-HFJU-5HRM': 7678,\n",
      " 'QCKR-LQY6-9QG7-RECU-SCVG-YLY7': 41026,\n",
      " 'TKYG-H6FP-SWUT-JVLQ-W4WN-7M3G': 4923,\n",
      " 'U4XJ-7VYK-4NMM-QS4M-7PTE-THS3': 6939,\n",
      " 'WMUT-W7YR-H476-649Q-54VE-KAXT': 41026}\n",
      "266429 records in 2.7 secs (96999 records per sec)\n",
      "266429 records in 3.0 secs (88332 records per sec)\n",
      "266429 records in 2.7 secs (97686 records per sec)\n",
      "266429 records in 2.8 secs (95656 records per sec)\n",
      "266429 records in 2.8 secs (95715 records per sec)\n",
      "266429 records in 2.8 secs (94560 records per sec)\n",
      "266429 records in 2.8 secs (94794 records per sec)\n",
      "266429 records in 2.9 secs (92736 records per sec)\n",
      "266429 records in 2.9 secs (91915 records per sec)\n",
      "266429 records in 2.9 secs (91725 records per sec)\n",
      "266429 records in 2.9 secs (90601 records per sec)\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "28.558505331980996"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import timeit\n",
    "import time\n",
    "from pprint import pprint\n",
    "\n",
    "def test3(verbose=False):\n",
    "    started = time.perf_counter()\n",
    "    total = 0\n",
    "\n",
    "    res = {}\n",
    "    final = {}\n",
    "    with db.begin() as txn:\n",
    "        for evt in schema.events.select(txn, limit=1000000-1, return_keys=False):\n",
    "            total += 1\n",
    "            if evt.receiver not in res:\n",
    "                res[evt.receiver] = 0\n",
    "            res[evt.receiver] += 1\n",
    "\n",
    "        for receiver in res:\n",
    "            session = schema.sessions[txn, receiver]\n",
    "            final[session.authid] = res[receiver]\n",
    "\n",
    "    ended = time.perf_counter()\n",
    "    recs_per_sec = int(float(total) / (ended - started))\n",
    "\n",
    "    if verbose:\n",
    "        pprint(final)\n",
    "\n",
    "    print('{} records in {:.2} secs ({} records per sec)'.format(total, ended - started, recs_per_sec))\n",
    "\n",
    "    return recs_per_sec\n",
    "\n",
    "test3(verbose=True)\n",
    "timeit.timeit(test3, number=10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%autoawait asyncio"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "import aiohttp\n",
    "import asyncio\n",
    "\n",
    "async def fetch(session, url):\n",
    "    async with session.get(url) as response:\n",
    "        return await response.text()\n",
    "\n",
    "async with aiohttp.ClientSession() as session:\n",
    "    html = await fetch(session, 'https://crossbar.io')\n",
    "    print(html.find('Crossbar'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from autobahn.asyncio.component import Component, run\n",
    "from autobahn.wamp.types import RegisterOptions\n",
    "\n",
    "import asyncio\n",
    "\n",
    "\n",
    "component = Component(\n",
    "    transports=[\n",
    "        {\n",
    "            \"type\": \"websocket\",\n",
    "            \"url\": \"ws://localhost:8080/ws\",\n",
    "            #\"url\": \"ws://crossbar:8080/ws\",\n",
    "            \"endpoint\": {\n",
    "                \"type\": \"tcp\",\n",
    "                \"host\": \"localhost\",\n",
    "                #\"host\": \"crossbar\",\n",
    "                \"port\": 8080\n",
    "            }\n",
    "        },\n",
    "    ],\n",
    "    realm=\"realm1\",\n",
    ")\n",
    "\n",
    "\n",
    "async def add2(x, y, details):\n",
    "    print(\"add2(x={}, y={}, details={})\".format(x, y, details))\n",
    "    return x + y\n",
    "\n",
    "\n",
    "@component.on_join\n",
    "async def join(session, details):\n",
    "    print(\"joined {}\".format(details))\n",
    "    await session.register(add2, u\"foobar.add3\", options=RegisterOptions(details_arg='details'))\n",
    "    print(\"component ready!\")\n",
    "\n",
    "    # await run([component])\n",
    "\n",
    "await component.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import lmdb\n",
    "\n",
    "#db = lmdb.open('testdb2', readonly=True, subdir=True)\n",
    "db = lmdb.open('../.testdb', readonly=True, subdir=True, lock=False)\n",
    "#db = lmdb.open('.', readonly=False, subdir=True)\n",
    "print(db)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "os.listdir('../../crossbar/.testdb')\n",
    "#os.listdir('../.testdb')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
